Fivetran 2024年2月新機能:Fivetranからdbt Cloudのジョブをトリガーで実行できるようになりました #Fivetran

Fivetran 2024年2月新機能:Fivetranからdbt Cloudのジョブをトリガーで実行できるようになりました #Fivetran

Clock Icon2024.02.05

こんにちは、スズです。

2024年2月に、Fivetranとdbt Cloudの連携機能が発表されました。

Introducing our newest integration with dbt Cloud. 🔥 Now, you can orchestrate #dbtCloud jobs directly from Fivetran’s platform, unifying #extraction, #loading, and #transformation all in one place. Less effort, more efficiency.

Learn more: https://t.co/56XWQMD13U

— Fivetran (@fivetran) February 1, 2024

この新機能により、FivetranのTransformationからdbt Cloudと連携し、dbt CloudのジョブをFivetranからトリガーで実行できるようになりました。早速試してみましたので、本記事でご紹介いたします。
※本機能は2024年2月時点でベータ版となっております。

事前準備

Fivetranの設定

Destination

データの同期先となるDestinationとして今回はSnowflakeを使用します。SnowflakeをDestinationに設定する方法は、以下のブログにてご紹介しております。

Connector

データの同期元はGoogleスプレッドシートを使用します。ConnectorにGoogleスプレッドシートを設定する方法は、以下のブログにてご紹介しております。

今回は使用するデータをそれぞれGoogleスプレッドシートにアップロードし、Connectorで接続しています。

使用するデータ

今回使用するデータは、dbt公式のドキュメント『Quickstart for dbt Cloud and Snowflake』にあるデータを使用しています。

GoogleスプレッドシートからSnowflakeにロードしたデータは、FIVETRAN_DATABASEデータベースの配下にあります。JAFFLE_SHOPスキーマにあるCUSTOMERSORDERSのテーブルが、今回dbt Cloudを使ったデータの変換に使用する元のデータとなります。

CUSTOMERSテーブルのデータは以下のようになっています。

ORDERSテーブルのデータは以下のようになっています。

dbt Cloudの設定

プロジェクトの作成

dbt Cloudでプロジェクトを作成し、データ変換とジョブを定義します。プロジェクトは、上記の「使用するデータ」と同様に、dbt公式のドキュメント『Quickstart for dbt Cloud and Snowflake』の内容を使用しています。今回使用するdbt Cloudのプロジェクトでは、CUSTOMERSテーブルからSTG_CUSTOMERSテーブル、ORDERSテーブルからSTG_ORDERSテーブルを作成し、さらにSTG_CUSTOMERSテーブルとSTG_ORDERSテーブルからCUSTOMERSテーブルを新しく作成する、ということを行っています。

  • motels/stg_customers.sql
select
    id as customer_id,
    first_name,
    last_name

from {{ source('jaffle_shop', 'customers') }}
  • motels/stg_orders.sql
select
    id as order_id,
    user_id as customer_id,
    order_date,
    status

from {{ source('jaffle_shop', 'orders') }}
  • motels/customers.sql
with customers as (

    select * from {{ ref('stg_customers') }}

),

orders as (

    select * from {{ ref('stg_orders') }}

),

customer_orders as (

    select
        customer_id,

        min(order_date) as first_order_date,
        max(order_date) as most_recent_order_date,
        count(order_id) as number_of_orders

    from orders

    group by 1

),

final as (

    select
        customers.customer_id,
        customers.first_name,
        customers.last_name,
        customer_orders.first_order_date,
        customer_orders.most_recent_order_date,
        coalesce(customer_orders.number_of_orders, 0) as number_of_orders

    from customers

    left join customer_orders using (customer_id)

)

select * from final

その他の設定として、dbt Cloudで変換したデータの出力先は、DBT_FIVETRAN_TESTという名前のデータベースを指定しています。また、dbt buildコマンドを実行するジョブを作成しています。Fivetranからトリガーでこのジョブを実行することを想定しています。

Service Tokenの取得

Fivetranとの連携のために、dbt CloudのService Tokenを使用します。Service Tokenを取得するには管理者権限が必要となります。詳細はdbt Cloudのドキュメントをご参照ください。

なお、今回はdbt CloudのProfile settings(Account settings > Perfonal profile)にてAPI Keyを取得して使用しています。

Transformationの設定

ここからFivetranのTransformationでdbt Cloudと連携してみます。Fivetranの左ペインの[Transformations]から、Transformationを設定する対象のDestinationを選択します。

Transformationを何で行うかを選択します。今回はdbt Cloud projectを選択します。

Fivetranからdbt Cloudにアクセスするための設定を行います。

  • Region: dbt Cloudのリージョン
    URLがhttps://cloud.getdbt.comの場合はUS regionhttps://emea.dbt.comの場合はEurope region
  • Service Token: dbt Cloudのサービストークン
  • Account: dbt Cloudのアカウントを選択
  • Project: dbt Cloudのプロジェクトを選択

[Service Token]を入力した後、[Authenticate]をクリックし認証に成功すると、[Account]が表示されドロップダウンで選択できるようになります。選択した[Account]にあわせて、[Project]を選択できるようになります。[Project]を選択後、[Check project]をクリックしてチェックしたのち、[Save & Test]をクリックして先に進みます。

テストに成功しました。[Transformations]をクリックします。

ここまででFivetranとdbt Cloudの接続設定が完了しました。次に、[Add Transformation] > [dbt Cloud]をクリックしてトリガーで実行するジョブを設定していきます。

Select jobにあるプルダウンメニューから対象のジョブを選択します。

ジョブの実行方法として[Integrated]または[Custom]のどちらかを選びます。[Custom]の場合、ジョブを実行する曜日や頻度、実行時刻を設定できます。

今回は[Integrated]を選択します。Integratedの場合、Integratedに設定したConnectorの同期にあわせてジョブが実行されます。今回は「jaffle_shop.customers」というConnectorを設定しています。設定後、[Save & Run]をクリックします。

Transformationsの画面に戻り、先ほど設定したTransformationが追加されていることが確認できます。この時点ではジョブは実行されておらず、StatusがPendingになっています。

Connectorを同期してみる

Transformationの設定が完了しましたので、Connectorを同期し、ジョブが実行されるかを確認します。Transformationに設定したConnectorを表示し、[SYNC NOW]をクリックして同期を実行します。

しばらくしてTransformationを確認すると、StatusがSucceededと表示されています。

dbt Cloudでもジョブが実行されていることが確認できました。

Snowflakeを確認すると、FIVETRAN_DBT_TRANSFORMATIONスキーマの配下にCUSTOMERSSTG_CUSTOMERSSTG_ORDERSのテーブルが作成され、dbtで変換したデータが出力されていました。

最後に

Fivetranとdbt Cloudの連携機能をご紹介しました。この新機能により、データのロードからdbt Cloudでのデータの変換までのスケジュールをFivetranで調整できるようになりました。1つのプラットフォーム上でスケジュール管理でき、便利な機能ではないでしょうか。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.